package xmq

import (
	"context"
	"encoding/json"
	"fmt"
	"time"

	"github.com/segmentio/kafka-go"

	"gitee.com/xfrm/middleware/xmq/pub"
	"gitee.com/xfrm/middleware/xmq/pulsar"

	"gitee.com/xfrm/middleware/xcontext"
	"gitee.com/xfrm/middleware/xlog"
	"gitee.com/xfrm/middleware/xmq/delay"
	"gitee.com/xfrm/middleware/xtime"
	"gitee.com/xfrm/middleware/xtrace"
)

const (
	traceComponent    = "xmq"
	defaultRouteGroup = "default"
)

var mqOpDurationLimit = 10 * time.Millisecond

type Message struct {
	Key   string
	Value interface{}
}

// WriteMsg write msg to kafka topic
func WriteMsg(ctx context.Context, topic string, key string, value interface{}) error {
	fun := "mq.WriteMsg -->"

	span, ctx := xtrace.StartSpanFromContext(ctx, "mq.WriteMsg")
	defer span.Finish()

	conf := &instanceConf{
		group:     xcontext.GetControlRouteGroupWithMasterName(ctx, pub.DefaultRouteGroup),
		role:      RoleTypeKafkaWriter,
		topic:     topic,
		groupId:   "",
		partition: 0,
	}
	writer := defaultInstanceManager.getKafkaWriter(ctx, conf)
	if writer == nil {
		return fmt.Errorf("%s, getWriter err, topic: %s", fun, topic)
	}

	payload, err := generatePayload(ctx, value)
	if err != nil {
		return fmt.Errorf("%s, GeneratePayload err: %v, topic: %s", fun, err, topic)
	}

	st := xtime.NewTimeStat()
	defer func() {
		dur := st.Duration()
		if dur > mqOpDurationLimit {
			xlog.Infof(ctx, "%s slow topic:%s dur:%d", fun, topic, dur)
		}
		pub.StatReqDuration(ctx, topic, "xmq.WriteMsg", pub.TraceMessageBusTypeKafka, st.Millisecond())
	}()

	return writer.WriteMsg(ctx, key, payload)
}

// WriteMsgs writes a batch of messages to the kafka topic configured on this
// writer.
func WriteMsgs(ctx context.Context, topic string, msgs ...Message) error {
	fun := "mq.WriteMsgs -->"

	span, ctx := xtrace.StartSpanFromContext(ctx, "mq.WriteMsgs")
	defer span.Finish()

	conf := &instanceConf{
		group:     xcontext.GetControlRouteGroupWithMasterName(ctx, pub.DefaultRouteGroup),
		role:      RoleTypeKafkaWriter,
		topic:     topic,
		groupId:   "",
		partition: 0,
	}
	writer := defaultInstanceManager.getKafkaWriter(ctx, conf)
	if writer == nil {
		return fmt.Errorf("%s, getWriter err, topic: %s", fun, topic)
	}

	nmsgs, err := generateMsgsPayload(ctx, msgs...)
	if err != nil {
		return fmt.Errorf("%s, generateMsgsPayload err: %v, topic: %s", fun, err, topic)
	}

	var kmsgs []kafka.Message
	for _, msg := range nmsgs {
		body, err := json.Marshal(msg.Value)
		if err != nil {
			return err
		}
		kmsgs = append(kmsgs, kafka.Message{
			Key:   []byte(msg.Key),
			Value: body,
		})
	}

	st := xtime.NewTimeStat()
	defer func() {
		dur := st.Duration()
		if dur > mqOpDurationLimit {
			xlog.Infof(ctx, "%s slow topic:%s dur:%d", fun, topic, dur)
		}
		pub.StatReqDuration(ctx, topic, "xmq.WriteMsgs", pub.TraceMessageBusTypeKafka, st.Millisecond())
	}()

	return writer.WriteMsgs(ctx, kmsgs)
}

// ReadMsgByGroup 读完消息后会自动提交offset
func ReadMsgByGroup(ctx context.Context, topic, groupId string, value interface{}) (context.Context, error) {
	fun := "mq.ReadMsgByGroup -->"

	span, ctx := xtrace.StartSpanFromContext(ctx, "mq.ReadMsgByGroup")
	defer span.Finish()

	conf := &instanceConf{
		group:     xcontext.GetControlRouteGroupWithMasterName(ctx, pub.DefaultRouteGroup),
		role:      RoleTypeKafkaReader,
		topic:     topic,
		groupId:   groupId,
		partition: 0,
	}
	reader := defaultInstanceManager.getKafkaReader(ctx, conf)
	if reader == nil {
		return ctx, fmt.Errorf("%s, getReader err, topic: %s", fun, topic)
	}

	var payload Payload
	st := xtime.NewTimeStat()

	err := reader.ReadMsg(ctx, &payload, value)

	dur := st.Duration()
	if dur > mqOpDurationLimit {
		xlog.Infof(ctx, "%s slow topic:%s groupId:%s dur:%d", fun, topic, groupId, dur)
	}

	pub.StatReqDuration(ctx, topic, "xmq.ReadMsgByGroup", pub.TraceMessageBusTypeKafka, st.Millisecond())
	if err != nil {
		return ctx, fmt.Errorf("%s, ReadMsg err: %v, topic: %s", fun, err, topic)
	}

	if len(payload.Value) == 0 {
		return ctx, nil
	}

	mctx, err := parsePayload(&payload, "mq.ReadMsgByGroup", value)
	mspan := xtrace.SpanFromContext(mctx)
	if mspan != nil {
		defer mspan.Finish()
	}
	return mctx, err
}

// ReadMsgByPartition ...
func ReadMsgByPartition(ctx context.Context, topic string, partition int, value interface{}) (context.Context, error) {
	fun := "mq.ReadMsgByPartition -->"

	span, ctx := xtrace.StartSpanFromContext(ctx, "mq.ReadMsgByPartition")
	defer span.Finish()

	conf := &instanceConf{
		group:     xcontext.GetControlRouteGroupWithMasterName(ctx, pub.DefaultRouteGroup),
		role:      RoleTypeKafkaReader,
		topic:     topic,
		groupId:   "",
		partition: partition,
	}
	reader := defaultInstanceManager.getKafkaReader(ctx, conf)
	if reader == nil {
		return ctx, fmt.Errorf("%s, getReader err, topic: %s", fun, topic)
	}

	var payload Payload
	st := xtime.NewTimeStat()

	err := reader.ReadMsg(ctx, &payload, value)

	dur := st.Duration()
	if dur > mqOpDurationLimit {
		xlog.Infof(ctx, "%s slow topic:%s partition:%d dur:%d", fun, topic, partition, dur)
	}

	pub.StatReqDuration(ctx, topic, "xmq.ReadMsgByPartition", pub.TraceMessageBusTypeKafka, st.Millisecond())
	if err != nil {
		return ctx, fmt.Errorf("%s, ReadMsg err: %v, topic: %s", fun, err, topic)
	}

	if len(payload.Value) == 0 {
		return ctx, nil
	}

	mctx, err := parsePayload(&payload, "mq.ReadMsgByPartition", value)
	mspan := xtrace.SpanFromContext(mctx)
	if mspan != nil {
		defer mspan.Finish()
	}
	return mctx, err
}

// FetchMsgByGroup 读完消息后不会自动提交offset,需要手动调用Handle.CommitMsg方法来提交offset
func FetchMsgByGroup(ctx context.Context, topic, groupId string, value interface{}) (context.Context, pub.Handler, error) {
	fun := "mq.FetchMsgByGroup -->"

	span, ctx := xtrace.StartSpanFromContext(ctx, "mq.FetchMsgByGroup")
	defer span.Finish()

	conf := &instanceConf{
		group:     xcontext.GetControlRouteGroupWithMasterName(ctx, pub.DefaultRouteGroup),
		role:      RoleTypeKafkaReader,
		topic:     topic,
		groupId:   groupId,
		partition: 0,
	}
	reader := defaultInstanceManager.getKafkaReader(ctx, conf)
	if reader == nil {
		return ctx, nil, fmt.Errorf("%s, getReader err, topic: %s", fun, topic)
	}

	var payload Payload
	st := xtime.NewTimeStat()

	handler, err := reader.FetchMsg(ctx, &payload, value)

	dur := st.Duration()
	if dur > mqOpDurationLimit {
		xlog.Infof(ctx, "%s slow topic:%s groupId:%s dur:%d", fun, topic, groupId, dur)
	}

	pub.StatReqDuration(ctx, topic, "xmq.FetchMsgByGroup", pub.TraceMessageBusTypeKafka, st.Millisecond())
	if err != nil {
		return ctx, nil, fmt.Errorf("%s, ReadMsg err: %v, topic: %s", fun, err, topic)
	}

	if len(payload.Value) == 0 {
		return ctx, handler, nil
	}

	mctx, err := parsePayload(&payload, "mq.FetchMsgByGroup", value)
	mspan := xtrace.SpanFromContext(mctx)
	if mspan != nil {
		defer mspan.Finish()
	}
	return mctx, handler, err
}

// WriteDelayMsg write a message to delay queue
func WriteDelayMsg(ctx context.Context, topic string, value interface{}, delaySeconds uint32) (jobID string, err error) {
	fun := "mq.WriteDelayMsg -->"

	span, ctx := xtrace.StartSpanFromContext(ctx, "mq.WriteDelayMsg")
	defer span.Finish()

	conf := &instanceConf{
		group:     xcontext.GetControlRouteGroupWithMasterName(ctx, pub.DefaultRouteGroup),
		role:      RoleTypeDelayClient,
		topic:     topic,
		groupId:   "",
		partition: 0,
	}
	client := defaultInstanceManager.getDelayClient(ctx, conf)
	if client == nil {
		err = fmt.Errorf("%s, getDelayClient nil, topic: %s", fun, topic)
		return
	}

	payload, err := generatePayload(ctx, value)
	if err != nil {
		err = fmt.Errorf("%s, GeneratePayload err: %v, topic: %s", fun, err, topic)
		return
	}

	st := xtime.NewTimeStat()
	defer func() {
		dur := st.Duration()
		if dur > mqOpDurationLimit {
			xlog.Infof(ctx, "%s slow topic:%s dur:%d", fun, topic, dur)
		}
		pub.StatReqDuration(ctx, topic, "xmq.WriteDelayMsg", pub.TraceMessageBusTypeDelay, st.Millisecond())
	}()

	return client.WriteJob(ctx, payload, delaySeconds)
}

// FetchDelayMsg 读完消息后不会自动确认
func FetchDelayMsg(ctx context.Context, topic string, value interface{}) (context.Context, pub.AckHandler, error) {
	fun := "mq.FetchDelayMsg -->"

	span, ctx := xtrace.StartSpanFromContext(ctx, "mq.FetchDelayMsg")
	defer span.Finish()

	conf := &instanceConf{
		group:     xcontext.GetControlRouteGroupWithMasterName(ctx, pub.DefaultRouteGroup),
		role:      RoleTypeDelayClient,
		topic:     topic,
		groupId:   "",
		partition: 0,
	}
	client := defaultInstanceManager.getDelayClient(ctx, conf)
	if client == nil {
		err := fmt.Errorf("%s, getDelayClient nil, topic: %s", fun, topic)
		return ctx, nil, err
	}

	var payload Payload
	st := xtime.NewTimeStat()

	job, err := client.ReadJob(ctx)
	if err != nil {
		return ctx, nil, fmt.Errorf("%s, Read err: %v, topic: %s", fun, err, topic)
	}
	err = json.Unmarshal(job.Body, &payload)
	if err != nil {
		return ctx, nil, fmt.Errorf("%s, Unmarshal payload err: %v, topic: %s", fun, err, topic)
	}
	err = json.Unmarshal(job.Body, &value)
	if err != nil {
		return ctx, nil, err
	}

	handler := delay.NewDelayHandler(client, job.ID)

	dur := st.Duration()
	if dur > mqOpDurationLimit {
		xlog.Infof(ctx, "%s slow topic:%s  dur:%d", fun, topic, dur)
	}

	pub.StatReqDuration(ctx, topic, "xmq.FetchDelayMsg", pub.TraceMessageBusTypeDelay, st.Millisecond())
	if len(payload.Value) == 0 {
		return ctx, handler, nil
	}
	mctx, err := parsePayload(&payload, "mq.FetchDelayMsg", value)
	mspan := xtrace.SpanFromContext(mctx)
	if mspan != nil {
		defer mspan.Finish()
	}
	return mctx, handler, nil
}

// FetchDelayMsgAndID 读完消息不会自动确认并返回JobID
func FetchDelayMsgAndID(ctx context.Context, topic string, value interface{}) (context.Context, pub.AckHandler, string, error) {
	fun := "mq.FetchDelayMsgAndID -->"

	span, ctx := xtrace.StartSpanFromContext(ctx, "mq.FetchDelayMsg")
	defer span.Finish()

	conf := &instanceConf{
		group:     xcontext.GetControlRouteGroupWithMasterName(ctx, pub.DefaultRouteGroup),
		role:      RoleTypeDelayClient,
		topic:     topic,
		groupId:   "",
		partition: 0,
	}
	client := defaultInstanceManager.getDelayClient(ctx, conf)
	if client == nil {
		err := fmt.Errorf("%s, getDelayClient nil, topic: %s", fun, topic)
		return ctx, nil, "", err
	}

	var payload Payload
	st := xtime.NewTimeStat()

	job, err := client.ReadJob(ctx)
	if err != nil {
		return ctx, nil, "", fmt.Errorf("%s, Read err: %v, topic: %s", fun, err, topic)
	}
	err = json.Unmarshal(job.Body, &payload)
	if err != nil {
		return ctx, nil, job.ID, fmt.Errorf("%s, Unmarshal payload err: %v, topic: %s", fun, err, topic)
	}
	err = json.Unmarshal(job.Body, &value)
	if err != nil {
		return ctx, nil, job.ID, err
	}

	handler := delay.NewDelayHandler(client, job.ID)

	dur := st.Duration()
	if dur > mqOpDurationLimit {
		xlog.Infof(ctx, "%s slow topic:%s  dur:%d", fun, topic, dur)
	}

	pub.StatReqDuration(ctx, topic, "xmq.FetchDelayMsgAndID", pub.TraceMessageBusTypeDelay, st.Millisecond())
	if len(payload.Value) == 0 {
		return ctx, handler, job.ID, nil
	}
	mctx, err := parsePayload(&payload, "mq.FetchDelayMsg", value)
	mspan := xtrace.SpanFromContext(mctx)
	if mspan != nil {
		defer mspan.Finish()
	}
	return mctx, handler, job.ID, nil
}

// ReadDelayMsg 读完自动确认
func ReadDelayMsg(ctx context.Context, topic string, value interface{}) (context.Context, error) {
	fun := "mq.ReadDelayMsg -->"

	span, ctx := xtrace.StartSpanFromContext(ctx, "mq.ReadDelayMsg")
	defer span.Finish()

	conf := &instanceConf{
		group:     xcontext.GetControlRouteGroupWithMasterName(ctx, pub.DefaultRouteGroup),
		role:      RoleTypeDelayClient,
		topic:     topic,
		groupId:   "",
		partition: 0,
	}
	client := defaultInstanceManager.getDelayClient(ctx, conf)
	if client == nil {
		err := fmt.Errorf("%s, getDelayClient nil, topic: %s", fun, topic)
		return ctx, err
	}

	var payload Payload
	st := xtime.NewTimeStat()

	job, err := client.ReadJob(ctx)
	if err != nil {
		return ctx, fmt.Errorf("%s, Read err: %v, topic: %s", fun, err, topic)
	}
	err = json.Unmarshal(job.Body, &payload)
	if err != nil {
		return ctx, fmt.Errorf("%s, Unmarshal payload err: %v, topic: %s", fun, err, topic)
	}
	err = json.Unmarshal(job.Body, &value)
	if err != nil {
		return ctx, err
	}

	dur := st.Duration()
	if dur > mqOpDurationLimit {
		xlog.Infof(ctx, "%s slow topic:%s  dur:%d", fun, topic, dur)
	}

	pub.StatReqDuration(ctx, topic, "xmq.ReadDelayMsg", pub.TraceMessageBusTypeDelay, st.Millisecond())
	err = client.Ack(ctx, job.ID)
	if err != nil {
		return ctx, err
	}
	if len(payload.Value) == 0 {
		return ctx, nil
	}
	mctx, err := parsePayload(&payload, "mq.ReadDelayMsg", value)
	mspan := xtrace.SpanFromContext(mctx)
	if mspan != nil {
		defer mspan.Finish()
	}
	return mctx, nil
}

// ReadDelayMsgAndID 读完自动确认并返回JobID
func ReadDelayMsgAndID(ctx context.Context, topic string, value interface{}) (context.Context, string, error) {
	fun := "mq.ReadDelayMsgAndID -->"

	span, ctx := xtrace.StartSpanFromContext(ctx, "mq.ReadDelayMsg")
	defer span.Finish()

	conf := &instanceConf{
		group:     xcontext.GetControlRouteGroupWithMasterName(ctx, pub.DefaultRouteGroup),
		role:      RoleTypeDelayClient,
		topic:     topic,
		groupId:   "",
		partition: 0,
	}
	client := defaultInstanceManager.getDelayClient(ctx, conf)
	if client == nil {
		err := fmt.Errorf("%s, getDelayClient nil, topic: %s", fun, topic)
		return ctx, "", err
	}

	var payload Payload
	st := xtime.NewTimeStat()

	job, err := client.ReadJob(ctx)
	if err != nil {
		return ctx, "", fmt.Errorf("%s, Read err: %v, topic: %s", fun, err, topic)
	}
	err = json.Unmarshal(job.Body, &payload)
	if err != nil {
		return ctx, job.ID, fmt.Errorf("%s, Unmarshal payload err: %v, topic: %s", fun, err, topic)
	}
	err = json.Unmarshal(job.Body, &value)
	if err != nil {
		return ctx, job.ID, err
	}

	dur := st.Duration()
	if dur > mqOpDurationLimit {
		xlog.Infof(ctx, "%s slow topic:%s  dur:%d", fun, topic, dur)
	}

	pub.StatReqDuration(ctx, topic, "xmq.ReadDelayMsgAndID", pub.TraceMessageBusTypeDelay, st.Millisecond())
	err = client.Ack(ctx, job.ID)
	if err != nil {
		return ctx, job.ID, err
	}
	if len(payload.Value) == 0 {
		return ctx, job.ID, nil
	}
	mctx, err := parsePayload(&payload, "mq.ReadDelayMsg", value)
	mspan := xtrace.SpanFromContext(mctx)
	if mspan != nil {
		defer mspan.Finish()
	}
	return mctx, job.ID, nil
}

// WritePulsarMsg writes a sync messages to the pulsar topic
func WritePulsarMsg(ctx context.Context, topic string, key string, value interface{}) (pulsar.MessageID, error) {
	fun := "mq.WritePulsarMsgs -->"

	span, ctx := xtrace.StartSpanFromContext(ctx, "mq.WritePulsarMsg")
	defer span.Finish()

	conf := &instanceConf{
		group: xcontext.GetControlRouteGroupWithMasterName(ctx, pub.DefaultRouteGroup),
		role:  RoleTypePulsarProducer,
		topic: topic,
	}
	producer := defaultInstanceManager.getPulsarProducer(ctx, conf)
	if producer == nil {
		return nil, fmt.Errorf("%s, getProducer err, topic: %s", fun, topic)
	}

	payload, err := generatePayload(ctx, value)
	if err != nil {
		return nil, fmt.Errorf("%s, generatePayload err: %v, topic: %s", fun, err, topic)
	}

	st := xtime.NewTimeStat()
	defer func() {
		dur := st.Duration()
		if dur > mqOpDurationLimit {
			xlog.Infof(ctx, "%s slow topic:%s dur:%d", fun, topic, dur)
		}
	}()

	return producer.WriteMsg(ctx, key, payload)
}

// WritePulsarAsyncMsg writes a async messages to the pulsar topic
func WritePulsarAsyncMsg(ctx context.Context, topic string, key string, value interface{}, callback func(pulsar.MessageID, *pulsar.ProducerMessage, error)) error {
	fun := "mq.WritePulsarAsyncMsg -->"

	span, ctx := xtrace.StartSpanFromContext(ctx, "mq.WritePulsarAsyncMsg")
	defer span.Finish()

	conf := &instanceConf{
		group: xcontext.GetControlRouteGroupWithMasterName(ctx, pub.DefaultRouteGroup),
		role:  RoleTypePulsarProducer,
		topic: topic,
	}
	producer := defaultInstanceManager.getPulsarProducer(ctx, conf)
	if producer == nil {
		return fmt.Errorf("%s, getProducer err, topic: %s", fun, topic)
	}

	payload, err := generatePayload(ctx, value)
	if err != nil {
		return fmt.Errorf("%s, generatePayload err: %v, topic: %s", fun, err, topic)
	}

	st := xtime.NewTimeStat()
	defer func() {
		dur := st.Duration()
		if dur > mqOpDurationLimit {
			xlog.Infof(ctx, "%s slow topic:%s dur:%d", fun, topic, dur)
		}
	}()

	return producer.WriteAsyncMsg(ctx, key, payload, callback)
}

// ReadPulsarMsg read a messages from the pulsar topic
func ReadPulsarMsg(ctx context.Context, topic, groupId string, value interface{}) (context.Context, error) {
	fun := "mq.ReadPulsarMsg -->"

	span, ctx := xtrace.StartSpanFromContext(ctx, "mq.ReadPulsarMsg")
	defer span.Finish()

	conf := &instanceConf{
		group:   xcontext.GetControlRouteGroupWithMasterName(ctx, pub.DefaultRouteGroup),
		role:    RoleTypePulsarConsumer,
		topic:   topic,
		groupId: groupId,
	}
	consumer := defaultInstanceManager.getPulsarConsumer(ctx, conf)
	if consumer == nil {
		return ctx, fmt.Errorf("%s, getPulsarConsumer err, topic: %s", fun, topic)
	}

	var payload Payload
	st := xtime.NewTimeStat()

	err := consumer.ReadMsg(ctx, &payload, value)

	dur := st.Duration()
	if dur > mqOpDurationLimit {
		xlog.Infof(ctx, "%s slow topic:%s groupId:%s dur:%d", fun, topic, groupId, dur)
	}

	pub.StatReqDuration(ctx, topic, "xmq.ReadPulsarMsg", pub.TraceMessageBusTypePulsar, st.Millisecond())
	if err != nil {
		return ctx, fmt.Errorf("%s, ReadPulsarMsg err: %v, topic: %s", fun, err, topic)
	}

	if len(payload.Value) == 0 {
		return ctx, nil
	}

	mctx, err := parsePayload(&payload, "mq.ReadPulsarMsg", value)
	mspan := xtrace.SpanFromContext(mctx)
	if mspan != nil {
		defer mspan.Finish()
	}
	return mctx, err
}

// FetchPulsarMsg fetch a messages from the pulsar topic
func FetchPulsarMsg(ctx context.Context, topic, groupId string, value interface{}) (context.Context, pub.AckHandler, error) {
	fun := "mq.FetchPulsarMsg -->"

	span, ctx := xtrace.StartSpanFromContext(ctx, "mq.FetchPulsarMsg")
	defer span.Finish()

	conf := &instanceConf{
		group:   xcontext.GetControlRouteGroupWithDefault(ctx, pub.DefaultRouteGroup),
		role:    RoleTypePulsarConsumer,
		topic:   topic,
		groupId: groupId,
	}
	consumer := defaultInstanceManager.getPulsarConsumer(ctx, conf)
	if consumer == nil {
		return ctx, nil, fmt.Errorf("%s, getPulsarConsumer err, topic: %s", fun, topic)
	}

	var payload Payload
	st := xtime.NewTimeStat()

	handler, err := consumer.FetchMsg(ctx, &payload, value)

	dur := st.Duration()
	if dur > mqOpDurationLimit {
		xlog.Infof(ctx, "%s slow topic:%s groupId:%s dur:%d", fun, topic, groupId, dur)
	}

	pub.StatReqDuration(ctx, topic, "xmq.FetchPulsarMsg", pub.TraceMessageBusTypePulsar, st.Millisecond())
	if err != nil {
		return ctx, nil, fmt.Errorf("%s, FetchPulsarMsg err: %v, topic: %s", fun, err, topic)
	}

	if len(payload.Value) == 0 {
		return ctx, handler, nil
	}

	mctx, err := parsePayload(&payload, "mq.FetchPulsarMsg", value)
	mspan := xtrace.SpanFromContext(mctx)
	if mspan != nil {
		defer mspan.Finish()
	}
	return mctx, handler, err
}

// DeleteDelayMsg delete delay queue job by msgID (jobID)
func DeleteDelayMsg(ctx context.Context, topic, msgID string) (err error) {
	fun := "mq.DeleteDelayMsg -->"

	span, ctx := xtrace.StartSpanFromContext(ctx, "mq.DeleteDelayMsg")
	defer span.Finish()

	conf := &instanceConf{
		group:     xcontext.GetControlRouteGroupWithMasterName(ctx, pub.DefaultRouteGroup),
		role:      RoleTypeDelayClient,
		topic:     topic,
		groupId:   "",
		partition: 0,
	}
	client := defaultInstanceManager.getDelayClient(ctx, conf)
	if client == nil {
		err = fmt.Errorf("%s, getDelayClient nil, topic: %s", fun, topic)
		return
	}
	return client.Ack(ctx, msgID)
}

// Close todo:梳理这块逻辑为什么存在，是否需要下线
func Close() {
	defaultInstanceManager.Close()
}
